forked from resque/resque-scheduler
/
resque_scheduler.rb
120 lines (103 loc) · 3.97 KB
/
resque_scheduler.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
require 'rubygems'
require 'resque'
require 'resque/server'
require 'resque_scheduler/version'
require 'resque/scheduler'
require 'resque_scheduler/server'
module ResqueScheduler
#
# Accepts a new schedule configuration of the form:
#
# {some_name => {"cron" => "5/* * * *",
# "class" => DoSomeWork,
# "args" => "work on this string",
# "description" => "this thing works it"s butter off"},
# ...}
#
# :name can be anything and is used only to describe the scheduled job
# :cron can be any cron scheduling string :job can be any resque job class
# :class must be a resque worker class
# :args can be any yaml which will be converted to a ruby literal and passed
# in a params. (optional)
# :description is just that, a description of the job (optional). If params is
# an array, each element in the array is passed as a separate param,
# otherwise params is passed in as the only parameter to perform.
def schedule=(schedule_hash)
@schedule = schedule_hash
end
# Returns the schedule hash
def schedule
@schedule ||= {}
end
# This method is nearly identical to +enqueue+ only it also
# takes a timestamp which will be used to schedule the job
# for queueing. Until timestamp is in the past, the job will
# sit in the schedule list.
def enqueue_at(timestamp, klass, *args)
delayed_push(timestamp, :class => klass.to_s, :args => args)
end
# Identical to enqueue_at but takes number_of_seconds_from_now
# instead of a timestamp.
def enqueue_in(number_of_seconds_from_now, klass, *args)
enqueue_at(Time.now + number_of_seconds_from_now, klass, *args)
end
# Used internally to stuff the item into the schedule sorted list.
# +timestamp+ can be either in seconds or a datetime object
# Insertion if O(log(n)).
# Returns true if it's the first job to be scheduled at that time, else false
def delayed_push(timestamp, item)
# First add this item to the list for this timestamp
redis.rpush("delayed:#{timestamp.to_i}", encode(item))
# Now, add this timestamp to the zsets. The score and the value are
# the same since we'll be querying by timestamp, and we don't have
# anything else to store.
redis.zadd :delayed_queue_schedule, timestamp.to_i, timestamp.to_i
end
# Returns an array of timestamps based on start and count
def delayed_queue_peek(start, count)
redis.zrange(:delayed_queue_schedule, start, start+count).collect(&:to_i)
end
# Returns the size of the delayed queue schedule
def delayed_queue_schedule_size
redis.zcard :delayed_queue_schedule
end
# Returns the number of jobs for a given timestamp in the delayed queue schedule
def delayed_timestamp_size(timestamp)
redis.llen("delayed:#{timestamp.to_i}").to_i
end
# Returns an array of delayed items for the given timestamp
def delayed_timestamp_peek(timestamp, start, count)
if 1 == count
r = list_range "delayed:#{timestamp.to_i}", start, count
r.nil? ? [] : [r]
else
list_range "delayed:#{timestamp.to_i}", start, count
end
end
# Returns the next delayed queue timestamp
# (don't call directly)
def next_delayed_timestamp
timestamp = redis.zrangebyscore(:delayed_queue_schedule, '-inf', Time.now.to_i, 'limit', 0, 1).first
timestamp.to_i unless timestamp.nil?
end
# Returns the next item to be processed for a given timestamp, nil if
# done. (don't call directly)
# +timestamp+ can either be in seconds or a datetime
def next_item_for_timestamp(timestamp)
key = "delayed:#{timestamp.to_i}"
item = decode redis.lpop(key)
# If the list is empty, remove it.
if 0 == redis.llen(key).to_i
redis.del key
redis.zrem :delayed_queue_schedule, timestamp.to_i
end
item
end
def cron_not_done?(name, timestamp)
redis.getset("cron_#{name}", timestamp.to_i).to_s != timestamp.to_i.to_s
end
end
Resque.extend ResqueScheduler
Resque::Server.class_eval do
include ResqueScheduler::Server
end