Skip to content

Commit

Permalink
Extract enqueue logic, sidekiq#2159
Browse files Browse the repository at this point in the history
  • Loading branch information
mperham committed Jan 27, 2015
1 parent 24b35ec commit 339571a
Showing 1 changed file with 31 additions and 23 deletions.
54 changes: 31 additions & 23 deletions lib/sidekiq/scheduled.rb
Expand Up @@ -5,8 +5,32 @@

module Sidekiq
module Scheduled
SETS = %w(retry schedule)

INITIAL_WAIT = 10
class Enq
def enqueue_jobs(now=Time.now.to_f.to_s, sorted_sets=SETS)
# A job's "score" in Redis is the time at which it should be processed.
# Just check Redis for the set of jobs with a timestamp before now.
Sidekiq.redis do |conn|
sorted_sets.each do |sorted_set|
# Get the next item in the queue if it's score (time to execute) is <= now.
# We need to go through the list one at a time to reduce the risk of something
# going wrong between the time jobs are popped from the scheduled queue and when
# they are pushed onto a work queue and losing the jobs.
while job = conn.zrangebyscore(sorted_set, '-inf', now, :limit => [0, 1]).first do

# Pop item off the queue and add it to the work queue. If the job can't be popped from
# the queue, it's because another process already popped it so we can move on to the
# next one.
if conn.zrem(sorted_set, job)
Sidekiq::Client.push(Sidekiq.load_json(job))
Sidekiq::Logging.logger.debug { "enqueued #{sorted_set}: #{job}" }
end
end
end
end
end
end

##
# The Poller checks Redis every N seconds for jobs in the retry or scheduled
Expand All @@ -17,34 +41,18 @@ class Poller
include Util
include Actor

SETS = %w(retry schedule)
INITIAL_WAIT = 10

def initialize
@enq = (Sidekiq.options[:poll_enq] || Sidekiq::Scheduled::Enq).new
end

def poll(first_time=false)
watchdog('scheduling poller thread died!') do
initial_wait if first_time

begin
# A job's "score" in Redis is the time at which it should be processed.
# Just check Redis for the set of jobs with a timestamp before now.
now = Time.now.to_f.to_s
Sidekiq.redis do |conn|
SETS.each do |sorted_set|
# Get the next item in the queue if it's score (time to execute) is <= now.
# We need to go through the list one at a time to reduce the risk of something
# going wrong between the time jobs are popped from the scheduled queue and when
# they are pushed onto a work queue and losing the jobs.
while job = conn.zrangebyscore(sorted_set, '-inf', now, :limit => [0, 1]).first do

# Pop item off the queue and add it to the work queue. If the job can't be popped from
# the queue, it's because another process already popped it so we can move on to the
# next one.
if conn.zrem(sorted_set, job)
Sidekiq::Client.push(Sidekiq.load_json(job))
logger.debug { "enqueued #{sorted_set}: #{job}" }
end
end
end
end
@enq.enqueue_jobs
rescue => ex
# Most likely a problem with redis networking.
# Punt and try again at the next interval
Expand Down

0 comments on commit 339571a

Please sign in to comment.