forked from collectiveidea/delayed_job
/
worker.rb
225 lines (187 loc) · 6.66 KB
/
worker.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
require 'timeout'
require 'active_support/core_ext/numeric/time'
require 'active_support/core_ext/class/attribute_accessors'
require 'active_support/core_ext/kernel'
require 'active_support/core_ext/enumerable'
require 'logger'
require 'benchmark'
module Delayed
class Worker
cattr_accessor :min_priority, :max_priority, :max_attempts, :max_run_time, :default_priority, :sleep_delay, :logger, :delay_jobs, :queues, :read_ahead
self.sleep_delay = 5
self.max_attempts = 25
self.max_run_time = 4.hours
self.default_priority = 0
self.delay_jobs = true
self.queues = []
self.read_ahead = 5
# Add or remove plugins in this list before the worker is instantiated
cattr_accessor :plugins
self.plugins = [Delayed::Plugins::ClearLocks]
# By default failed jobs are destroyed after too many attempts. If you want to keep them around
# (perhaps to inspect the reason for the failure), set this to false.
cattr_accessor :destroy_failed_jobs
self.destroy_failed_jobs = true
self.logger = if defined?(Rails)
Rails.logger
elsif defined?(RAILS_DEFAULT_LOGGER)
RAILS_DEFAULT_LOGGER
end
# name_prefix is ignored if name is set directly
attr_accessor :name_prefix
cattr_reader :backend
def self.backend=(backend)
if backend.is_a? Symbol
require "delayed/serialization/#{backend}"
require "delayed/backend/#{backend}"
backend = "Delayed::Backend::#{backend.to_s.classify}::Job".constantize
end
@@backend = backend
silence_warnings { ::Delayed.const_set(:Job, backend) }
end
def self.guess_backend
warn "[DEPRECATION] guess_backend is deprecated. Please remove it from your code."
end
def self.before_fork
unless @files_to_reopen
@files_to_reopen = []
ObjectSpace.each_object(File) do |file|
@files_to_reopen << file unless file.closed?
end
end
backend.before_fork
end
def self.after_fork
# Re-open file handles
@files_to_reopen.each do |file|
begin
file.reopen file.path, "a+"
file.sync = true
rescue ::Exception
end
end
backend.after_fork
end
def self.lifecycle
@lifecycle ||= Delayed::Lifecycle.new
end
def initialize(options={})
@quiet = options.has_key?(:quiet) ? options[:quiet] : true
self.class.min_priority = options[:min_priority] if options.has_key?(:min_priority)
self.class.max_priority = options[:max_priority] if options.has_key?(:max_priority)
self.class.sleep_delay = options[:sleep_delay] if options.has_key?(:sleep_delay)
self.class.read_ahead = options[:read_ahead] if options.has_key?(:read_ahead)
self.class.queues = options[:queues] if options.has_key?(:queues)
self.plugins.each { |klass| klass.new }
end
# Every worker has a unique name which by default is the pid of the process. There are some
# advantages to overriding this with something which survives worker retarts: Workers can#
# safely resume working on tasks which are locked by themselves. The worker will assume that
# it crashed before.
def name
return @name unless @name.nil?
"#{@name_prefix}host:#{Socket.gethostname} pid:#{Process.pid}" rescue "#{@name_prefix}pid:#{Process.pid}"
end
# Sets the name of the worker.
# Setting the name to nil will reset the default worker name
def name=(val)
@name = val
end
def start
trap('TERM') { say 'Exiting...'; stop }
trap('INT') { say 'Exiting...'; stop }
say "Starting job worker"
self.class.lifecycle.run_callbacks(:execute, self) do
loop do
self.class.lifecycle.run_callbacks(:loop, self) do
result = nil
realtime = Benchmark.realtime do
result = work_off
end
count = result.sum
break if @exit
if count.zero?
sleep(self.class.sleep_delay)
else
say "#{count} jobs processed at %.4f j/s, %d failed ..." % [count / realtime, result.last]
end
end
break if @exit
end
end
end
def stop
@exit = true
end
# Do num jobs and return stats on success/failure.
# Exit early if interrupted.
def work_off(num = 100)
success, failure = 0, 0
num.times do
case reserve_and_run_one_job
when true
success += 1
when false
failure += 1
else
break # leave if no work could be done
end
break if $exit # leave if we're exiting
end
return [success, failure]
end
def run(job)
runtime = Benchmark.realtime do
Timeout.timeout(self.class.max_run_time.to_i) { job.invoke_job }
job.destroy
end
say "#{job.name} completed after %.4f" % runtime
return true # did work
rescue DeserializationError => error
job.last_error = "{#{error.message}\n#{error.backtrace.join("\n")}"
failed(job)
rescue Exception => error
self.class.lifecycle.run_callbacks(:error, self, job){ handle_failed_job(job, error) }
return false # work failed
end
# Reschedule the job in the future (when a job fails).
# Uses an exponential scale depending on the number of failed attempts.
def reschedule(job, time = nil)
if (job.attempts += 1) < max_attempts(job)
time ||= job.reschedule_at
job.run_at = time
job.unlock
job.save!
else
say "PERMANENTLY removing #{job.name} because of #{job.attempts} consecutive failures.", Logger::INFO
failed(job)
end
end
def failed(job)
self.class.lifecycle.run_callbacks(:failure, self, job) do
job.hook(:failure)
self.class.destroy_failed_jobs ? job.destroy : job.fail!
end
end
def say(text, level = Logger::INFO)
text = "[Worker(#{name})] #{text}"
puts text unless @quiet
logger.add level, "#{Time.now.strftime('%FT%T%z')}: #{text}" if logger
end
def max_attempts(job)
job.max_attempts || self.class.max_attempts
end
protected
def handle_failed_job(job, error)
job.last_error = "{#{error.message}\n#{error.backtrace.join("\n")}"
say "#{job.name} failed with #{error.class.name}: #{error.message} - #{job.attempts} failed attempts", Logger::ERROR
reschedule(job)
end
# Run the next job we can get an exclusive lock on.
# If no jobs are left we return nil
def reserve_and_run_one_job
job = Delayed::Job.reserve(self)
self.class.lifecycle.run_callbacks(:perform, self, job){ result = run(job) } if job
end
end
end