forked from resque/resque
/
resque.rb
369 lines (318 loc) · 10.4 KB
/
resque.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
require 'redis/namespace'
require 'resque/version'
require 'resque/errors'
require 'resque/failure'
require 'resque/failure/base'
require 'resque/helpers'
require 'resque/stat'
require 'resque/job'
require 'resque/worker'
require 'resque/plugin'
module Resque
include Helpers
extend self
# Accepts:
# 1. A 'hostname:port' String
# 2. A 'hostname:port:db' String (to select the Redis db)
# 3. A 'hostname:port/namespace' String (to set the Redis namespace)
# 4. A Redis URL String 'redis://host:port'
# 5. An instance of `Redis`, `Redis::Client`, `Redis::DistRedis`,
# or `Redis::Namespace`.
def redis=(server)
case server
when String
if server =~ /redis\:\/\//
redis = Redis.connect(:url => server, :thread_safe => true)
else
server, namespace = server.split('/', 2)
host, port, db = server.split(':')
redis = Redis.new(:host => host, :port => port,
:thread_safe => true, :db => db)
end
namespace ||= :resque
@redis = Redis::Namespace.new(namespace, :redis => redis)
when Redis::Namespace
@redis = server
else
@redis = Redis::Namespace.new(:resque, :redis => server)
end
end
# Returns the current Redis connection. If none has been created, will
# create a new one.
def redis
return @redis if @redis
self.redis = Redis.respond_to?(:connect) ? Redis.connect : "localhost:6379"
self.redis
end
def redis_id
# support 1.x versions of redis-rb
if redis.respond_to?(:server)
redis.server
elsif redis.respond_to?(:nodes) # distributed
redis.nodes.map { |n| n.id }.join(', ')
else
redis.client.id
end
end
# The `before_first_fork` hook will be run in the **parent** process
# only once, before forking to run the first job. Be careful- any
# changes you make will be permanent for the lifespan of the
# worker.
#
# Call with a block to set the hook.
# Call with no arguments to return the hook.
def before_first_fork(&block)
block ? (@before_first_fork = block) : @before_first_fork
end
# Set a proc that will be called in the parent process before the
# worker forks for the first time.
attr_writer :before_first_fork
# The `before_fork` hook will be run in the **parent** process
# before every job, so be careful- any changes you make will be
# permanent for the lifespan of the worker.
#
# Call with a block to set the hook.
# Call with no arguments to return the hook.
def before_fork(&block)
block ? (@before_fork = block) : @before_fork
end
# Set the before_fork proc.
attr_writer :before_fork
# The `after_fork` hook will be run in the child process and is passed
# the current job. Any changes you make, therefore, will only live as
# long as the job currently being processed.
#
# Call with a block to set the hook.
# Call with no arguments to return the hook.
def after_fork(&block)
block ? (@after_fork = block) : @after_fork
end
# Set the after_fork proc.
attr_writer :after_fork
def to_s
"Resque Client connected to #{redis_id}"
end
attr_accessor :inline
# If 'inline' is true Resque will call #perform method inline
# without queuing it into Redis and without any Resque callbacks.
# The 'inline' is false Resque jobs will be put in queue regularly.
alias :inline? :inline
#
# queue manipulation
#
# Pushes a job onto a queue. Queue name should be a string and the
# item should be any JSON-able Ruby object.
#
# Resque works generally expect the `item` to be a hash with the following
# keys:
#
# class - The String name of the job to run.
# args - An Array of arguments to pass the job. Usually passed
# via `class.to_class.perform(*args)`.
#
# Example
#
# Resque.push('archive', :class => 'Archive', :args => [ 35, 'tar' ])
#
# Returns nothing
def push(queue, item)
watch_queue(queue)
redis.rpush "queue:#{queue}", encode(item)
end
# Pops a job off a queue. Queue name should be a string.
#
# Returns a Ruby object.
def pop(queue)
decode redis.lpop("queue:#{queue}")
end
# Returns an integer representing the size of a queue.
# Queue name should be a string.
def size(queue)
redis.llen("queue:#{queue}").to_i
end
# Returns an array of items currently queued. Queue name should be
# a string.
#
# start and count should be integer and can be used for pagination.
# start is the item to begin, count is how many items to return.
#
# To get the 3rd page of a 30 item, paginatied list one would use:
# Resque.peek('my_list', 59, 30)
def peek(queue, start = 0, count = 1)
list_range("queue:#{queue}", start, count)
end
# Does the dirty work of fetching a range of items from a Redis list
# and converting them into Ruby objects.
def list_range(key, start = 0, count = 1)
if count == 1
decode redis.lindex(key, start)
else
Array(redis.lrange(key, start, start+count-1)).map do |item|
decode item
end
end
end
# Returns an array of all known Resque queues as strings.
def queues
Array(redis.smembers(:queues))
end
# Given a queue name, completely deletes the queue.
def remove_queue(queue)
redis.srem(:queues, queue.to_s)
redis.del("queue:#{queue}")
end
# Used internally to keep track of which queues we've created.
# Don't call this directly.
def watch_queue(queue)
redis.sadd(:queues, queue.to_s)
end
#
# job shortcuts
#
# This method can be used to conveniently add a job to a queue.
# It assumes the class you're passing it is a real Ruby class (not
# a string or reference) which either:
#
# a) has a @queue ivar set
# b) responds to `queue`
#
# If either of those conditions are met, it will use the value obtained
# from performing one of the above operations to determine the queue.
#
# If no queue can be inferred this method will raise a `Resque::NoQueueError`
#
# Returns true if the job was queued, nil if the job was rejected by a
# before_enqueue hook.
#
# This method is considered part of the `stable` API.
def enqueue(klass, *args)
enqueue_to(queue_from_class(klass), klass, *args)
end
# Just like `enqueue` but allows you to specify the queue you want to
# use. Runs hooks.
#
# `queue` should be the String name of the queue you're targeting.
#
# Returns true if the job was queued, nil if the job was rejected by a
# before_enqueue hook.
#
# This method is considered part of the `stable` API.
def enqueue_to(queue, klass, *args)
# Perform before_enqueue hooks. Don't perform enqueue if any hook returns false
before_hooks = Plugin.before_enqueue_hooks(klass).collect do |hook|
klass.send(hook, *args)
end
return nil if before_hooks.any? { |result| result == false }
Job.create(queue, klass, *args)
Plugin.after_enqueue_hooks(klass).each do |hook|
klass.send(hook, *args)
end
return true
end
# This method can be used to conveniently remove a job from a queue.
# It assumes the class you're passing it is a real Ruby class (not
# a string or reference) which either:
#
# a) has a @queue ivar set
# b) responds to `queue`
#
# If either of those conditions are met, it will use the value obtained
# from performing one of the above operations to determine the queue.
#
# If no queue can be inferred this method will raise a `Resque::NoQueueError`
#
# If no args are given, this method will dequeue *all* jobs matching
# the provided class. See `Resque::Job.destroy` for more
# information.
#
# Returns the number of jobs destroyed.
#
# Example:
#
# # Removes all jobs of class `UpdateNetworkGraph`
# Resque.dequeue(GitHub::Jobs::UpdateNetworkGraph)
#
# # Removes all jobs of class `UpdateNetworkGraph` with matching args.
# Resque.dequeue(GitHub::Jobs::UpdateNetworkGraph, 'repo:135325')
#
# This method is considered part of the `stable` API.
def dequeue(klass, *args)
# Perform before_dequeue hooks. Don't perform dequeue if any hook returns false
before_hooks = Plugin.before_dequeue_hooks(klass).collect do |hook|
klass.send(hook, *args)
end
return if before_hooks.any? { |result| result == false }
Job.destroy(queue_from_class(klass), klass, *args)
Plugin.after_dequeue_hooks(klass).each do |hook|
klass.send(hook, *args)
end
end
# Given a class, try to extrapolate an appropriate queue based on a
# class instance variable or `queue` method.
def queue_from_class(klass)
klass.instance_variable_get(:@queue) ||
(klass.respond_to?(:queue) and klass.queue)
end
# This method will return a `Resque::Job` object or a non-true value
# depending on whether a job can be obtained. You should pass it the
# precise name of a queue: case matters.
#
# This method is considered part of the `stable` API.
def reserve(queue)
Job.reserve(queue)
end
# Validates if the given klass could be a valid Resque job
#
# If no queue can be inferred this method will raise a `Resque::NoQueueError`
#
# If given klass is nil this method will raise a `Resque::NoClassError`
def validate(klass, queue = nil)
queue ||= queue_from_class(klass)
if !queue
raise NoQueueError.new("Jobs must be placed onto a queue.")
end
if klass.to_s.empty?
raise NoClassError.new("Jobs must be given a class.")
end
end
#
# worker shortcuts
#
# A shortcut to Worker.all
def workers
Worker.all
end
# A shortcut to Worker.working
def working
Worker.working
end
# A shortcut to unregister_worker
# useful for command line tool
def remove_worker(worker_id)
worker = Resque::Worker.find(worker_id)
worker.unregister_worker
end
#
# stats
#
# Returns a hash, similar to redis-rb's #info, of interesting stats.
def info
return {
:pending => queues.inject(0) { |m,k| m + size(k) },
:processed => Stat[:processed],
:queues => queues.size,
:workers => workers.size.to_i,
:working => working.size,
:failed => Stat[:failed],
:servers => [redis_id],
:environment => ENV['RAILS_ENV'] || ENV['RACK_ENV'] || 'development'
}
end
# Returns an array of all known Resque keys in Redis. Redis' KEYS operation
# is O(N) for the keyspace, so be careful - this can be slow for big databases.
def keys
redis.keys("*").map do |key|
key.sub("#{redis.namespace}:", '')
end
end
end