Skip to content

Commit

Permalink
Merge 854e553 into 1f88156
Browse files Browse the repository at this point in the history
  • Loading branch information
edgibbs committed Jan 18, 2015
2 parents 1f88156 + 854e553 commit 1b4b9ef
Show file tree
Hide file tree
Showing 4 changed files with 88 additions and 12 deletions.
14 changes: 14 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,20 @@ NewsletterJob = Struct.new(:text, :emails) do
end
```

To set a per-job default for destorying failed jobs that overrides the Delayed::Worker.destroy_failed_jobs you can define a destroy_failed_jobs method on the job

```ruby
NewsletterJob = Struct.new(:text, :emails) do
def perform
emails.each { |e| NewsletterMailer.deliver_text_to_email(text, e) }
end

def destroy_failed_jobs
false
end
end
```

To set a default queue name for a custom job that overrides Delayed::Worker.default_queue_name, you can define a queue_name method on the job

```ruby
Expand Down
4 changes: 4 additions & 0 deletions lib/delayed/backend/base.rb
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,10 @@ def max_run_time
end
end

def destroy_failed_jobs
payload_object.destroy_failed_jobs if payload_object.respond_to?(:destroy_failed_jobs)
end

def fail!
update_attributes(:failed_at => self.class.db_time_now)
end
Expand Down
70 changes: 63 additions & 7 deletions lib/delayed/backend/shared_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -450,6 +450,30 @@ def create_job(opts = {})
end
end

describe 'destroy_failed_jobs' do
before(:each) do
@job = described_class.enqueue SimpleJob.new
end

it 'is not defined' do
expect(@job.destroy_failed_jobs).to be_nil
end

it 'results in the default destroy failed jobs setting when not defined' do
expect(worker.destroy_failed_jobs(@job)).to eq(Delayed::Worker::DEFAULT_DESTROY_FAILED_JOBS)
end

it 'uses the destroy failed jobs value on the payload when defined' do
expect(@job.payload_object).to receive(:destroy_failed_jobs).and_return(true)
expect(@job.destroy_failed_jobs).to be_truthy
end

it 'results in an overridden destroy failed jobs value when defined' do
expect(@job.payload_object).to receive(:destroy_failed_jobs).and_return(true).twice
expect(worker.destroy_failed_jobs(@job)).to be_truthy
end
end

describe 'yaml serialization' do
context 'when serializing jobs' do
it 'raises error ArgumentError for new records' do
Expand Down Expand Up @@ -513,6 +537,7 @@ def create_job(opts = {})
it 'marks the job as failed' do
Delayed::Worker.destroy_failed_jobs = false
job = described_class.create! :handler => '--- !ruby/object:JobThatDoesNotExist {}'
expect(job).to receive(:destroy_failed_jobs).and_return(false).twice
worker.work_off
job.reload
expect(job).to be_failed
Expand Down Expand Up @@ -617,13 +642,24 @@ def create_job(opts = {})
end

context 'and we want to destroy jobs' do
after do
Delayed::Worker.destroy_failed_jobs = true
end

it_behaves_like 'any failure more than Worker.max_attempts times'

it 'is destroyed if it failed more than Worker.max_attempts times' do
expect(@job).to receive(:destroy)
Delayed::Worker.max_attempts.times { worker.reschedule(@job) }
end

it 'is destroyed if the job has destroy failed jobs set' do
Delayed::Worker.destroy_failed_jobs = false
expect(@job).to receive(:destroy_failed_jobs).and_return(true).twice
expect(@job).to receive(:destroy)
Delayed::Worker.max_attempts.times { worker.reschedule(@job) }
end

it 'is not destroyed if failed fewer than Worker.max_attempts times' do
expect(@job).not_to receive(:destroy)
(Delayed::Worker.max_attempts - 1).times { worker.reschedule(@job) }
Expand All @@ -641,15 +677,35 @@ def create_job(opts = {})

it_behaves_like 'any failure more than Worker.max_attempts times'

it 'is failed if it failed more than Worker.max_attempts times' do
expect(@job.reload).not_to be_failed
Delayed::Worker.max_attempts.times { worker.reschedule(@job) }
expect(@job.reload).to be_failed
context 'and destroy failed jobs is false' do
it 'is failed if it failed more than Worker.max_attempts times' do
expect(@job.reload).not_to be_failed
Delayed::Worker.max_attempts.times { worker.reschedule(@job) }
expect(@job.reload).to be_failed
end

it 'is not failed if it failed fewer than Worker.max_attempts times' do
(Delayed::Worker.max_attempts - 1).times { worker.reschedule(@job) }
expect(@job.reload).not_to be_failed
end
end

it 'is not failed if it failed fewer than Worker.max_attempts times' do
(Delayed::Worker.max_attempts - 1).times { worker.reschedule(@job) }
expect(@job.reload).not_to be_failed
context 'and destroy failed jobs for job is false' do
before do
Delayed::Worker.destroy_failed_jobs = true
end

it 'is failed if it failed more than Worker.max_attempts times' do
expect(@job).to receive(:destroy_failed_jobs).and_return(false).twice
expect(@job.reload).not_to be_failed
Delayed::Worker.max_attempts.times { worker.reschedule(@job) }
expect(@job.reload).to be_failed
end

it 'is not failed if it failed fewer than Worker.max_attempts times' do
(Delayed::Worker.max_attempts - 1).times { worker.reschedule(@job) }
expect(@job.reload).not_to be_failed
end
end
end
end
Expand Down
12 changes: 7 additions & 5 deletions lib/delayed/worker.rb
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ class Worker # rubocop:disable ClassLength
DEFAULT_DELAY_JOBS = true
DEFAULT_QUEUES = []
DEFAULT_READ_AHEAD = 5
DEFAULT_DESTROY_FAILED_JOBS = true

cattr_accessor :min_priority, :max_priority, :max_attempts, :max_run_time,
:default_priority, :sleep_delay, :logger, :delay_jobs, :queues,
Expand All @@ -39,17 +40,14 @@ def self.reset
self.delay_jobs = DEFAULT_DELAY_JOBS
self.queues = DEFAULT_QUEUES
self.read_ahead = DEFAULT_READ_AHEAD
self.destroy_failed_jobs = DEFAULT_DESTROY_FAILED_JOBS
end

reset

# Add or remove plugins in this list before the worker is instantiated
self.plugins = [Delayed::Plugins::ClearLocks]

# By default failed jobs are destroyed after too many attempts. If you want to keep them around
# (perhaps to inspect the reason for the failure), set this to false.
self.destroy_failed_jobs = true

# By default, Signals INT and TERM set @exit, and the worker exits upon completion of the current job.
# If you would prefer to raise a SignalException and exit immediately you can use this.
# Be aware daemons uses TERM to stop and restart
Expand Down Expand Up @@ -236,7 +234,7 @@ def failed(job)
say "Error when running failure callback: #{error}", 'error'
say error.backtrace.join("\n"), 'error'
ensure
self.class.destroy_failed_jobs ? job.destroy : job.fail!
destroy_failed_jobs(job) ? job.destroy : job.fail!
end
end
end
Expand Down Expand Up @@ -265,6 +263,10 @@ def max_run_time(job)
job.max_run_time || self.class.max_run_time
end

def destroy_failed_jobs(job)
job.destroy_failed_jobs.nil? ? self.class.destroy_failed_jobs : job.destroy_failed_jobs
end

protected

def handle_failed_job(job, error)
Expand Down

0 comments on commit 1b4b9ef

Please sign in to comment.