forked from resque/resque
-
Notifications
You must be signed in to change notification settings - Fork 2
/
job.rb
226 lines (198 loc) · 6.45 KB
/
job.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
module Resque
# A Resque::Job represents a unit of work. Each job lives on a
# single queue and has an associated payload object. The payload
# is a hash with two attributes: `class` and `args`. The `class` is
# the name of the Ruby class which should be used to run the
# job. The `args` are an array of arguments which should be passed
# to the Ruby class's `perform` class-level method.
#
# You can manually run a job using this code:
#
# job = Resque::Job.reserve(:high)
# klass = Resque::Job.constantize(job.payload['class'])
# klass.perform(*job.payload['args'])
class Job
include Helpers
extend Helpers
# Raise Resque::Job::DontPerform from a before_perform hook to
# abort the job.
DontPerform = Class.new(StandardError)
# The worker object which is currently processing this job.
attr_accessor :worker
# The name of the queue from which this job was pulled (or is to be
# placed)
attr_reader :queue
# This job's associated payload object.
attr_reader :payload
def initialize(queue, payload)
@queue = queue
@payload = payload
@failure_hooks_ran = false
end
# Creates a job by placing it on a queue. Expects a string queue
# name, a string class name, and an optional array of arguments to
# pass to the class' `perform` method.
#
# Raises an exception if no queue or class is given.
def self.create(queue, klass, *args)
Resque.validate(klass, queue)
if Resque.inline?
constantize(klass).perform(*decode(encode(args)))
else
Resque.push(queue, :class => klass.to_s, :args => args)
end
end
# Removes a job from a queue. Expects a string queue name, a
# string class name, and, optionally, args.
#
# Returns the number of jobs destroyed.
#
# If no args are provided, it will remove all jobs of the class
# provided.
#
# That is, for these two jobs:
#
# { 'class' => 'UpdateGraph', 'args' => ['defunkt'] }
# { 'class' => 'UpdateGraph', 'args' => ['mojombo'] }
#
# The following call will remove both:
#
# Resque::Job.destroy(queue, 'UpdateGraph')
#
# Whereas specifying args will only remove the 2nd job:
#
# Resque::Job.destroy(queue, 'UpdateGraph', 'mojombo')
#
# This method can be potentially very slow and memory intensive,
# depending on the size of your queue, as it loads all jobs into
# a Ruby array before processing.
def self.destroy(queue, klass, *args)
klass = klass.to_s
queue = "queue:#{queue}"
destroyed = 0
if args.empty?
redis.lrange(queue, 0, -1).each do |string|
if decode(string)['class'] == klass
destroyed += redis.lrem(queue, 0, string).to_i
end
end
else
destroyed += redis.lrem(queue, 0, encode(:class => klass, :args => args))
end
destroyed
end
# Given a string queue name, returns an instance of Resque::Job
# if any jobs are available. If not, returns nil.
def self.reserve(queue)
return unless payload = Resque.pop(queue)
new(queue, payload)
end
# Attempts to perform the work represented by this job instance.
# Calls #perform on the class given in the payload with the
# arguments given in the payload.
def perform
job = payload_class
job_args = args || []
job_was_performed = false
begin
# Execute before_perform hook. Abort the job gracefully if
# Resque::DontPerform is raised.
begin
before_hooks.each do |hook|
job.send(hook, *job_args)
end
rescue DontPerform
return false
end
# Execute the job. Do it in an around_perform hook if available.
if around_hooks.empty?
job.perform(*job_args)
job_was_performed = true
else
# We want to nest all around_perform plugins, with the last one
# finally calling perform
stack = around_hooks.reverse.inject(nil) do |last_hook, hook|
if last_hook
lambda do
job.send(hook, *job_args) { last_hook.call }
end
else
lambda do
job.send(hook, *job_args) do
result = job.perform(*job_args)
job_was_performed = true
result
end
end
end
end
stack.call
end
# Execute after_perform hook
after_hooks.each do |hook|
job.send(hook, *job_args)
end
# Return true if the job was performed
return job_was_performed
# If an exception occurs during the job execution, look for an
# on_failure hook then re-raise.
rescue Object => e
run_failure_hooks(e)
raise e
end
end
# Returns the actual class constant represented in this job's payload.
def payload_class
@payload_class ||= constantize(@payload['class'])
end
# Returns an array of args represented in this job's payload.
def args
@payload['args']
end
# Given an exception object, hands off the needed parameters to
# the Failure module.
def fail(exception)
run_failure_hooks(exception)
Failure.create \
:payload => payload,
:exception => exception,
:worker => worker,
:queue => queue
end
# Creates an identical job, essentially placing this job back on
# the queue.
def recreate
self.class.create(queue, payload_class, *args)
end
# String representation
def inspect
obj = @payload
"(Job{%s} | %s | %s)" % [ @queue, obj['class'], obj['args'].inspect ]
end
# Equality
def ==(other)
queue == other.queue &&
payload_class == other.payload_class &&
args == other.args
end
def before_hooks
@before_hooks ||= Plugin.before_hooks(payload_class)
end
def around_hooks
@around_hooks ||= Plugin.around_hooks(payload_class)
end
def after_hooks
@after_hooks ||= Plugin.after_hooks(payload_class)
end
def failure_hooks
@failure_hooks ||= Plugin.failure_hooks(payload_class)
end
def run_failure_hooks(exception)
begin
failure_hooks.each { |hook| payload_class.send(hook, exception, *Array.wrap(args)) } unless @failure_hooks_ran
ensure
@failure_hooks_ran = true
end
end
end
end