forked from collectiveidea/delayed_job
-
Notifications
You must be signed in to change notification settings - Fork 2
/
worker.rb
185 lines (153 loc) · 5.66 KB
/
worker.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
require 'timeout'
require 'active_support/core_ext/numeric/time'
require 'active_support/core_ext/class/attribute_accessors'
require 'active_support/core_ext/kernel'
require 'logger'
module Delayed
class Worker
cattr_accessor :min_priority, :max_priority, :max_attempts, :max_run_time, :default_priority, :sleep_delay, :logger
self.sleep_delay = 5
self.max_attempts = 25
self.max_run_time = 4.hours
self.default_priority = 0
# By default failed jobs are destroyed after too many attempts. If you want to keep them around
# (perhaps to inspect the reason for the failure), set this to false.
cattr_accessor :destroy_failed_jobs
self.destroy_failed_jobs = true
self.logger = if defined?(Rails)
Rails.logger
elsif defined?(RAILS_DEFAULT_LOGGER)
RAILS_DEFAULT_LOGGER
end
# name_prefix is ignored if name is set directly
attr_accessor :name_prefix
cattr_reader :backend
def self.backend=(backend)
if backend.is_a? Symbol
require "delayed/backend/#{backend}"
backend = "Delayed::Backend::#{backend.to_s.classify}::Job".constantize
end
@@backend = backend
silence_warnings { ::Delayed.const_set(:Job, backend) }
end
def self.guess_backend
self.backend = :active_record if defined?(ActiveRecord)
end
def initialize(options={})
@quiet = options[:quiet]
self.class.min_priority = options[:min_priority] if options.has_key?(:min_priority)
self.class.max_priority = options[:max_priority] if options.has_key?(:max_priority)
end
# Every worker has a unique name which by default is the pid of the process. There are some
# advantages to overriding this with something which survives worker retarts: Workers can#
# safely resume working on tasks which are locked by themselves. The worker will assume that
# it crashed before.
def name
return @name unless @name.nil?
"#{@name_prefix}host:#{Socket.gethostname} pid:#{Process.pid}" rescue "#{@name_prefix}pid:#{Process.pid}"
end
# Sets the name of the worker.
# Setting the name to nil will reset the default worker name
def name=(val)
@name = val
end
def start
say "Starting job worker"
trap('TERM') { say 'Exiting...'; $exit = true }
trap('INT') { say 'Exiting...'; $exit = true }
loop do
result = nil
realtime = Benchmark.realtime do
result = work_off
end
count = result.sum
break if $exit
if count.zero?
sleep(@@sleep_delay)
else
say "#{count} jobs processed at %.4f j/s, %d failed ..." % [count / realtime, result.last]
end
break if $exit
end
ensure
Delayed::Job.clear_locks!(name)
end
# Do num jobs and return stats on success/failure.
# Exit early if interrupted.
def work_off(num = 100)
success, failure = 0, 0
num.times do
case reserve_and_run_one_job
when true
success += 1
when false
failure += 1
else
break # leave if no work could be done
end
break if $exit # leave if we're exiting
end
return [success, failure]
end
def run(job)
runtime = Benchmark.realtime do
Timeout.timeout(self.class.max_run_time.to_i) { job.invoke_job }
job.destroy
end
say "#{job.name} completed after %.4f" % runtime
return true # did work
rescue Exception => e
handle_failed_job(job, e)
return false # work failed
end
# Reschedule the job in the future (when a job fails).
# Uses an exponential scale depending on the number of failed attempts.
def reschedule(job, time = nil)
if (job.attempts += 1) < self.class.max_attempts
time ||= Job.db_time_now + (job.attempts ** 4) + 5
job.run_at = time
job.unlock
job.save!
else
say "PERMANENTLY removing #{job.name} because of #{job.attempts} consecutive failures.", Logger::INFO
if job.payload_object.respond_to? :on_permanent_failure
say "Running on_permanent_failure hook"
failure_method = job.payload_object.method(:on_permanent_failure)
if failure_method.arity == 1
failure_method.call(job)
else
failure_method.call
end
end
self.class.destroy_failed_jobs ? job.destroy : job.update_attributes(:failed_at => Delayed::Job.db_time_now)
end
end
def say(text, level = Logger::INFO)
text = "[Worker(#{name})] #{text}"
puts text unless @quiet
logger.add level, "#{Time.now.strftime('%FT%T%z')}: #{text}" if logger
end
protected
def handle_failed_job(job, error)
job.last_error = error.message + "\n" + error.backtrace.join("\n")
say "#{job.name} failed with #{error.class.name}: #{error.message} - #{job.attempts} failed attempts", Logger::ERROR
reschedule(job)
end
# Run the next job we can get an exclusive lock on.
# If no jobs are left we return nil
def reserve_and_run_one_job
# We get up to 5 jobs from the db. In case we cannot get exclusive access to a job we try the next.
# this leads to a more even distribution of jobs across the worker processes
job = Delayed::Job.find_available(name, 5, self.class.max_run_time).detect do |job|
if job.lock_exclusively!(self.class.max_run_time, name)
say "acquired lock on #{job.name}"
true
else
say "failed to acquire exclusive lock for #{job.name}", Logger::WARN
false
end
end
run(job) if job
end
end
end