Skip to content

Commit

Permalink
Terminology: message -> job
Browse files Browse the repository at this point in the history
  • Loading branch information
mperham committed Oct 6, 2014
1 parent d54dc1c commit 3016747
Showing 1 changed file with 9 additions and 9 deletions.
18 changes: 9 additions & 9 deletions lib/sidekiq/scheduled.rb
Expand Up @@ -9,10 +9,10 @@ module Scheduled
INITIAL_WAIT = 10

##
# The Poller checks Redis every N seconds for messages in the retry or scheduled
# The Poller checks Redis every N seconds for jobs in the retry or scheduled
# set have passed their timestamp and should be enqueued. If so, it
# just pops the message back onto its original queue so the
# workers can pick it up like any other message.
# just pops the job back onto its original queue so the
# workers can pick it up like any other job.
class Poller
include Util
include Actor
Expand All @@ -24,23 +24,23 @@ def poll(first_time=false)
initial_wait if first_time

begin
# A message's "score" in Redis is the time at which it should be processed.
# Just check Redis for the set of messages with a timestamp before now.
# A job's "score" in Redis is the time at which it should be processed.
# Just check Redis for the set of jobs with a timestamp before now.
now = Time.now.to_f.to_s
Sidekiq.redis do |conn|
SETS.each do |sorted_set|
# Get the next item in the queue if it's score (time to execute) is <= now.
# We need to go through the list one at a time to reduce the risk of something
# going wrong between the time jobs are popped from the scheduled queue and when
# they are pushed onto a work queue and losing the jobs.
while message = conn.zrangebyscore(sorted_set, '-inf', now, :limit => [0, 1]).first do
while job = conn.zrangebyscore(sorted_set, '-inf', now, :limit => [0, 1]).first do

# Pop item off the queue and add it to the work queue. If the job can't be popped from
# the queue, it's because another process already popped it so we can move on to the
# next one.
if conn.zrem(sorted_set, message)
Sidekiq::Client.push(Sidekiq.load_json(message))
logger.debug { "enqueued #{sorted_set}: #{message}" }
if conn.zrem(sorted_set, job)
Sidekiq::Client.push(Sidekiq.load_json(job))
logger.debug { "enqueued #{sorted_set}: #{job}" }
end
end
end
Expand Down

0 comments on commit 3016747

Please sign in to comment.